-
Notifications
You must be signed in to change notification settings - Fork 138
Bidirectional streaming for pubsub #735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f64334f to
fbd12a7
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #735 +/- ##
==========================================
- Coverage 86.63% 86.04% -0.59%
==========================================
Files 84 87 +3
Lines 4473 4772 +299
==========================================
+ Hits 3875 4106 +231
- Misses 598 666 +68 ☔ View full report in Codecov by Sentry. |
|
|
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]> fixes typing Signed-off-by: Elena Kolevska <[email protected]> more readable example Signed-off-by: Elena Kolevska <[email protected]> linter Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
This reverts commit cb4b65b. Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
…ew message Signed-off-by: Elena Kolevska <[email protected]>
7ef5a2f to
8c9ce85
Compare
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
|
I am really grateful that somebody is finally working on this! I like how we can choose between subscribe and subscribe_with_handler, even though it's a bit unusual. It's a good approach imo. But from what I understand this doesn't use/support async iterator, which I think would be pretty useful if it did. I am not an expert, but adding an async iteration support should not be such a problem. Here are some code changes that were suggested to me by an AI, but it looks pretty legit:
class Subscription:
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
# ... existing init code ...
self._iterator_active = True
async def __aiter__(self):
"""Make the subscription async iterable."""
return self
async def __anext__(self):
"""Get the next message from the subscription stream."""
if not self._iterator_active:
raise StopAsyncIteration
try:
message = await self.next_message()
if message is None:
# If no message is received, continue iteration
return await self.__anext__()
return message
except StreamInactiveError:
self._iterator_active = False
raise StopAsyncIteration
except StreamCancelledError:
self._iterator_active = False
raise StopAsyncIteration
async def close(self):
"""Close the subscription and stop iteration."""
self._iterator_active = False
if self._stream:
try:
self._stream.cancel()
self._stream_active.clear()
except AioRpcError as e:
if e.code() != StatusCode.CANCELLED:
raise Exception(f'Error while closing stream: {e}')
except Exception as e:
raise Exception(f'Error while closing stream: {e}')
# ... other methods stay unchanged ...
async def subscribe_with_handler(
self,
pubsub_name: str,
topic: str,
handler_fn: Callable[..., Awaitable[TopicEventResponse]],
metadata: Optional[dict] = None,
dead_letter_topic: Optional[str] = None,
) -> Callable[[], Awaitable[None]]:
"""
Subscribe to a topic with a bidirectional stream and a message handler function
Args:
pubsub_name (str): The name of the pubsub component.
topic (str): The name of the topic.
handler_fn (Callable[..., Awaitable[TopicEventResponse]]): The function to call when a message is received.
metadata (Optional[dict]): Additional metadata for the subscription.
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
Returns:
Callable[[], Awaitable[None]]: An async function to close the subscription.
"""
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)
async def stream_messages():
try:
async for message in subscription:
try:
response = await handler_fn(message)
if response:
await subscription.respond(message, response.status)
except Exception as e:
# Log error and continue processing messages
print(f"Error processing message: {e}")
continue
except Exception as e:
print(f"Stream error: {e}")
# Start processing messages in background task
task = asyncio.create_task(stream_messages())
async def close_subscription():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await subscription.close()
return close_subscription
# examples/pubsub-streaming-async/subscriber-iterator.py
import argparse
import asyncio
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse
parser = argparse.ArgumentParser()
parser.add_argument('--topic', required=True)
args = parser.parse_args()
async def main():
async with DaprClient() as client:
subscription = await client.subscribe(
pubsub_name='pubsub',
topic=args.topic,
dead_letter_topic=f'{args.topic}_DEAD'
)
try:
# Process messages using async for
async for message in subscription:
print(f'Processing message: {message.data()} from {message.topic()}...')
# Process message here
await subscription.respond_success(message)
except Exception as e:
print(f"Error: {e}")
finally:
await subscription.close()
if __name__ == '__main__':
asyncio.run(main())
# tests/clients/test_dapr_grpc_client_async.py
async def test_subscribe_topic_iterator(self):
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
subscription = await dapr.subscribe(pubsub_name='pubsub', topic='example')
messages = []
async for message in subscription:
messages.append(message)
await subscription.respond_success(message)
if len(messages) >= 2: # Get first two messages
break
self.assertEqual(2, len(messages))
self.assertEqual('111', messages[0].id())
self.assertEqual('222', messages[1].id())
await subscription.close()After that, hopefully the previous methods should still work the same way as before. So the user would have more options. |
|
Thanks for the review @Aldraz! I like the iterator approach, it's much more pythonic. Let's do it! Since you already did the work here, would you like to send a PR to my branch so that you can get credit for it, and show up in the list of contributors for this release? :) |
|
@elena-kolevska @Aldraz I will merge this PR so we can iteratively refactor things to use the async iterator instead. This PR has always lingered on for a long while :) Let's consider this PR to be Part 1 /2 of this feature. |
|
Thanks @berndverst! :) |
* works Signed-off-by: Elena Kolevska <[email protected]> * works Signed-off-by: Elena Kolevska <[email protected]> * Sync bidi streaming and tests Signed-off-by: Elena Kolevska <[email protected]> * example fix Signed-off-by: Elena Kolevska <[email protected]> fixes typing Signed-off-by: Elena Kolevska <[email protected]> more readable example Signed-off-by: Elena Kolevska <[email protected]> linter Signed-off-by: Elena Kolevska <[email protected]> * examples fix Signed-off-by: Elena Kolevska <[email protected]> * Adds support for api token Signed-off-by: Elena Kolevska <[email protected]> * clean up Signed-off-by: Elena Kolevska <[email protected]> * Adds docs Signed-off-by: Elena Kolevska <[email protected]> * more small tweaks Signed-off-by: Elena Kolevska <[email protected]> * cleanups and tests Signed-off-by: Elena Kolevska <[email protected]> * Removes receive queue Signed-off-by: Elena Kolevska <[email protected]> * Adds `subscribe_with_handler` Signed-off-by: Elena Kolevska <[email protected]> * Fixes linter Signed-off-by: Elena Kolevska <[email protected]> * Fixes linter Signed-off-by: Elena Kolevska <[email protected]> * Adds async Signed-off-by: Elena Kolevska <[email protected]> * Adds tests for async streaming subscription Signed-off-by: Elena Kolevska <[email protected]> * Linter Signed-off-by: Elena Kolevska <[email protected]> * Split sync and async examples Signed-off-by: Elena Kolevska <[email protected]> * linter Signed-off-by: Elena Kolevska <[email protected]> * Adds interceptors to the async client for bidirectional streaming Signed-off-by: Elena Kolevska <[email protected]> * Removes unneeded class Signed-off-by: Elena Kolevska <[email protected]> * Removes async client Signed-off-by: Elena Kolevska <[email protected]> * Fixes missing docker-compose in examples (dapr#736) Signed-off-by: Elena Kolevska <[email protected]> * Removes async examples test Signed-off-by: Elena Kolevska <[email protected]> * Small cleanup Signed-off-by: Elena Kolevska <[email protected]> * Split up topic names between tests Signed-off-by: Elena Kolevska <[email protected]> * lint Signed-off-by: Elena Kolevska <[email protected]> * Revert "Removes async client" This reverts commit cb4b65b. Signed-off-by: Elena Kolevska <[email protected]> * Split up topic names between tests Signed-off-by: Elena Kolevska <[email protected]> * updates fake server to wait for confirmation message before sending new message Signed-off-by: Elena Kolevska <[email protected]> * Updates protos Signed-off-by: Elena Kolevska <[email protected]> * Adds stream cancelled error Signed-off-by: Elena Kolevska <[email protected]> * linter Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Elena Kolevska <[email protected]>
Description
Adds bidirectional streaming support for pub-sub, extending the API with the
subscribeandsubscribe_handler methods.The
subscribemethod returns aSubscriptionobject, which allows users to pull messages from thestream by calling the
next_messagemethod. This will block on the main thread while waiting for messages.When done, the
closemethod should be called to terminate the subscription and stop receiving messages.The
subscribe_with_handlermethod accepts a user callback function that is executed for each messagereceived from the stream.
It runs in a separate thread, so it doesn't block the main thread. The callback function should return a
TopicEventResponseStatus, indicating whether the message was processed successfully, should beretried, or discarded. Users can return these statuses using the
Subscription.SUCCESS,Subscription.RETRY, andSubscription.DROPclass properties. The method will automatically managemessage acknowledgements based on the returned status.
The call to
subscribe_with_handlermethod returns a close function, which should be called to terminate the subscription when done.Issue reference
#730
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list:
Release Note
RELEASE NOTE: ADD Implemented bidirectional streaming for pubsub